/*
 * Copyright (c) 2022. China Mobile (SuZhou) Software Technology Co.,Ltd. All rights reserved.
 * Lakehouse is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *          http://license.coscl.org.cn/MulanPSL2
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 */

package com.chinamobile.cmss.lakehouse.service.engine;

import com.chinamobile.cmss.lakehouse.common.Constants;
import com.chinamobile.cmss.lakehouse.common.engine.DirectEngine;
import com.chinamobile.cmss.lakehouse.common.exception.BaseException;
import com.chinamobile.cmss.lakehouse.common.utils.FileUtils;
import com.chinamobile.cmss.lakehouse.common.utils.PropertyUtils;
import com.chinamobile.cmss.lakehouse.engine.spark.SparkCommandBuilder;

import java.io.File;
import java.nio.file.Paths;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public abstract class SparkEngine {

    private final String sparkHome = PropertyUtils.getString(Constants.ENGINE_SPARK_HOME);
    private final String workerLogDir = PropertyUtils.getString(Constants.ENGINE_SPARK_WORKER_LOG_DIR);

    @Lazy
    @Autowired
    private ThreadPoolTaskExecutor executor;

    protected synchronized String submit(SparkCommandBuilder builder) {
        log.info("Starting spark job with name {} ...", builder.getAppName());
        File logFile = FileUtils.getProcessLogFile(Paths.get(workerLogDir), "spark-test");
        if (logFile == null) {
            throw new BaseException("process log file should not be null");
        }
        executor.submit(new Runnable() {
            @Override
            public void run() {
                DirectEngine engine = new com.chinamobile.cmss.lakehouse.engine.spark.SparkEngine(builder)
                    .setSparkHome(sparkHome);

                Process process = engine.submit(logFile);
                try {
                    int exitCode = process.waitFor();
                    if (exitCode != 0) {
                        log.error("Spark job process {} run failed!", builder.getAppName());
                    } else {
                        log.info("{} run succeed!", builder.getAppName());
                    }
                } catch (InterruptedException e) {
                    log.error("Spark job process {} interrupted!", builder.getAppName());
                    log.error(e.getCause().getMessage());
                }
            }
        });
        return logFile.getAbsolutePath();
    }

    protected synchronized void kill(SparkCommandBuilder builder, String driverPodId) {
        log.warn("Will kill task with pod id: {}", driverPodId);
        executor.submit(new Runnable() {
            @Override
            public void run() {
                DirectEngine engine = new com.chinamobile.cmss.lakehouse.engine.spark.SparkEngine(builder)
                    .setSparkHome(sparkHome);
                engine.kill(driverPodId);
            }
        });
    }

    protected boolean isK8s(String master) {
        assert StringUtils.isNotEmpty(master) : "Master can not be null!";
        return master.startsWith("k8s://");
    }
}
